-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Filter empty distribution metrics #32027
Conversation
afc0bb3
to
b0d332d
Compare
b0d332d
to
4a18297
Compare
Assigning reviewers. If you would like to opt out of this review, comment R: @tvalentyn for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Python_Coverage PreCommit |
4a18297
to
c92ba74
Compare
e17a6fa
to
d1eee16
Compare
d1eee16
to
e84f81d
Compare
all_monitoring_infos[monitoring_infos.to_key( | ||
sampled_byte_count)] = sampled_byte_count | ||
|
||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to check here if(count > 0)
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -214,6 +214,11 @@ def int64_user_distribution(namespace, name, metric, ptransform=None): | |||
ptransform: The ptransform id used as a label. | |||
""" | |||
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) | |||
if metric.count <= 0: | |||
raise TypeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not seeing where this exception will be caught, hence same question - can we avoid entering this codepath instead of catching the exception in a try-catch.
ValueError would be more appropriate here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if exception is uncaught, it can create a breaking change for users, so as much as possible I'd prefer to not enter the exception path or fail silently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the Dataflow Runner, there are checks when we create the distribution, so that should be fine. For a user defined counter, what behaviour do you want? I agree we shouldn't introduce breaking changes. Is it fine to not emit a counter in that case? If we still want to emit something here, then we'll have an empty counter, and we'd have to filter it out in the runner to prevent sending it to the backend (which we don't want to add either based on your previous comments.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not emitting sounds fine, left some comments.
if metric.count <= 0: | ||
raise TypeError( | ||
'Expected a non zero distribution count for %s metric but received %s' % | ||
(metric, metric.count)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably meant metric.name here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
40d4591
to
cf01c43
Compare
cf01c43
to
86eff84
Compare
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) | ||
return create_monitoring_info( | ||
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) | ||
if metric.count <= 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer to lead the condition with the happy path (if metric.count > 0
)
_LOGGER.debug( | ||
'Expected a non zero distribution count for %s metric but received %s' % | ||
(metric.name, metric.count)) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we return None
(implicitly), we need to change the method typehint to # type: (...) -> Optional[metrics_pb2.MonitoringInfo]
.
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) | ||
if metric.count <= 0: | ||
_LOGGER.debug( | ||
'Expected a non zero distribution count for %s metric but received %s' % |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which situation this log message will be actionable? I wonder if we should remove this log if it commonly happens (e.g. retries).
@@ -605,18 +605,23 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): | |||
receiver.opcounter.element_counter.value(), | |||
pcollection=pcollection_id, | |||
) | |||
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check that elem_count_mi
is not None
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what happens with distribution if the count is 0 after all? For example , pipeline reads from a file, but there are no elements, and pipeline stops. will this case be handled correctly?
@@ -214,6 +214,11 @@ def int64_user_distribution(namespace, name, metric, ptransform=None): | |||
ptransform: The ptransform id used as a label. | |||
""" | |||
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) | |||
if metric.count <= 0: | |||
raise TypeError( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not emitting sounds fine, left some comments.
So with the current approach of returning None, this fails: ./gradlew :sdks:python:test-suites:tox:pycommon:mypy (part of the Precommig Python Lint test) Essentially the issue is that we have a dict of strings input, which should produce a dict of Monitoring info. Since we aren't raising errors, pyhint fails since it now expects Ideally we do some filtering here, if we went down this approach, which isn't the most performant The other option was to raise an error, gaurd the runner against it, so then if a user creates a invalid distribution, it would error (which is a breaking change, which also isn't desirable) Then which of the two approaches make sense? Is there a third option? |
Reminder, please take a look at this pr: @tvalentyn |
Discussed offline |
waiting on author |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Filter empty distributions
On the python sdk, filter emtpy distributions so its not sent to the runner for processing, since it adds unnecessary overhead. This can occur when processing failed, or there was an empty split created (for batch pipelines)
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.